package com.atguigu.flink.watermark;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * Created by Smexy on 2023/2/28
 */
public class Demo5_MultiParilism
{
    public static void main(String[] args) {


        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 3333);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        env.disableOperatorChaining();
        env.getConfig().setAutoWatermarkInterval(2000);

        env.setParallelism(2);

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
            .<WaterSensor>forMonotonousTimestamps()
            .withTimestampAssigner( (e, ts) -> e.getTs());

                env
                   .socketTextStream("hadoop103", 8888)
                   .map(new WaterSensorMapFunction())
                   .assignTimestampsAndWatermarks(watermarkStrategy)
                   .keyBy(WaterSensor::getId)
                   .process(new KeyedProcessFunction<String, WaterSensor, WaterSensor>()
                   {
                       @Override
                       public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
                           TimerService timerService = ctx.timerService();
                           long wm = timerService.currentWatermark();
                           System.out.println(value+"到达时,此时process的WM:"+wm);
                       }
                   })
                   .print();


                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

    }
}
